{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wJcYs_ERTnnI"
      },
      "source": [
        "##### Copyright 2021 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "HMUDt0CiUJk9"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "77z2OchJTk0l"
      },
      "source": [
        "# Migrate the fault tolerance mechanism\n",
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://www.tensorflow.org/guide/migrate/fault_tolerance\">\n",
        "    <img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\" />\n",
        "    View on TensorFlow.org</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/migrate/fault_tolerance.ipynb\">\n",
        "    <img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\" />\n",
        "    Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/tensorflow/docs/blob/master/site/en/guide/migrate/fault_tolerance.ipynb\">\n",
        "    <img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" />\n",
        "    View source on GitHub</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a href=\"https://storage.googleapis.com/tensorflow_docs/docs/site/en/guide/migrate/fault_tolerance.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Download notebook</a>\n",
        "  </td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "n4O6fPyYTxZv"
      },
      "source": [
        "Fault tolerance refers to a mechanism of periodically saving the states of trackable objects, such as parameters and models. This enables you to recover  them in the event of a program/machine failure during training.\n",
        "\n",
        "This guide first demonstrates how to add fault tolerance to training with `tf.estimator.Estimator` in TensorFlow 1 by specifying metric saving with `tf.estimator.RunConfig`. Then, you will learn how to implement fault tolerance for training in Tensorflow 2 in two ways:\n",
        "\n",
        "- If you use the Keras `Model.fit` API, you can pass the `tf.keras.callbacks.BackupAndRestore` callback to it.\n",
        "- If you use a custom training loop (with `tf.GradientTape`), you can arbitrarily save checkpoints using the `tf.train.Checkpoint` and `tf.train.CheckpointManager` APIs.\n",
        "\n",
        "Both of these methods will back up and restore the training states in [checkpoint](../../guide/checkpoint.ipynb) files.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pHJfmkCFUhQf"
      },
      "source": [
        "## Setup"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TOVQubuDzdmA"
      },
      "source": [
        "Install `tf-nightly`, as the frequency of checkpoint saving at a particular step with the `save_freq` argument in `tf.keras.callbacks.BackupAndRestore` is introduced from TensorFlow 2.10:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "pGW0XhXkxY_q"
      },
      "outputs": [],
      "source": [
        "!pip install tf-nightly"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "VXnPvQi8Ui1F"
      },
      "outputs": [],
      "source": [
        "import tensorflow.compat.v1 as tf1\n",
        "import tensorflow as tf\n",
        "import numpy as np\n",
        "import tempfile\n",
        "import time"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Tww-uIoiUlsT"
      },
      "outputs": [],
      "source": [
        "mnist = tf.keras.datasets.mnist\n",
        "\n",
        "(x_train, y_train),(x_test, y_test) = mnist.load_data()\n",
        "x_train, x_test = x_train / 255.0, x_test / 255.0"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TtlucRG_Uro_"
      },
      "source": [
        "## TensorFlow 1: Save checkpoints with `tf.estimator.RunConfig`\n",
        "\n",
        "In TensorFlow 1, you can configure a `tf.estimator` to save checkpoints every step by configuring `tf.estimator.RunConfig`.\n",
        "\n",
        "In this example, start by writing a hook that artificially throws an error during the fifth checkpoint:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Q8shCkV2jKcc"
      },
      "outputs": [],
      "source": [
        "class InterruptHook(tf1.train.SessionRunHook):\n",
        "  # A hook for artificially interrupting training.\n",
        "  def begin(self):\n",
        "    self._step = -1\n",
        "\n",
        "  def before_run(self, run_context):\n",
        "    self._step += 1\n",
        "\n",
        "  def after_run(self, run_context, run_values):\n",
        "    if self._step == 5:\n",
        "      raise RuntimeError('Interruption')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ZXbQ6cFlkoIM"
      },
      "source": [
        "Next, configure `tf.estimator.Estimator` to save every checkpoint and use the MNIST dataset:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "1EKXzi4Qj2Eb"
      },
      "outputs": [],
      "source": [
        "feature_columns = [tf1.feature_column.numeric_column(\"x\", shape=[28, 28])]\n",
        "config = tf1.estimator.RunConfig(save_summary_steps=1,\n",
        "                                 save_checkpoints_steps=1)\n",
        "\n",
        "path = tempfile.mkdtemp()\n",
        "\n",
        "classifier = tf1.estimator.DNNClassifier(\n",
        "    feature_columns=feature_columns,\n",
        "    hidden_units=[256, 32],\n",
        "    optimizer=tf1.train.AdamOptimizer(0.001),\n",
        "    n_classes=10,\n",
        "    dropout=0.2,\n",
        "    model_dir=path,\n",
        "    config = config\n",
        ")\n",
        "\n",
        "train_input_fn = tf1.estimator.inputs.numpy_input_fn(\n",
        "    x={\"x\": x_train},\n",
        "    y=y_train.astype(np.int32),\n",
        "    num_epochs=10,\n",
        "    batch_size=50,\n",
        "    shuffle=True,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "sGP7Nyenk1gr"
      },
      "source": [
        "Begin training the model. An artificial exception will be raised by the hook you defined earlier."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xWKMsmt6jYSL"
      },
      "outputs": [],
      "source": [
        "try:\n",
        "  classifier.train(input_fn=train_input_fn,\n",
        "                   hooks=[InterruptHook()],\n",
        "                   max_steps=10)\n",
        "except Exception as e:\n",
        "  print(f'{type(e).__name__}:{e}')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DekxJkgWk-4N"
      },
      "source": [
        "Rebuild the `tf.estimator.Estimator` using the last saved checkpoint and continue training:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "vqMVTiJMjcH7"
      },
      "outputs": [],
      "source": [
        "classifier = tf1.estimator.DNNClassifier(\n",
        "    feature_columns=feature_columns,\n",
        "    hidden_units=[256, 32],\n",
        "    optimizer=tf1.train.AdamOptimizer(0.001),\n",
        "    n_classes=10,\n",
        "    dropout=0.2,\n",
        "    model_dir=path,\n",
        "    config = config\n",
        ")\n",
        "classifier.train(input_fn=train_input_fn,\n",
        "                   max_steps = 10)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "T5LtVtmvYx7J"
      },
      "source": [
        "## TensorFlow 2: Back up and restore with a callback and `Model.fit`\n",
        "\n",
        "In TensorFlow 2, if you use the Keras `Model.fit` API for training, you can provide the `tf.keras.callbacks.BackupAndRestore` callback to add the fault tolerance functionality.\n",
        "\n",
        "To help demonstrate this, first start by defining a Keras `Callback` class that artificially throws an error during the fourth epoch checkpoint:\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Ci3yB6A5lwJu"
      },
      "outputs": [],
      "source": [
        "class InterruptAtEpoch(tf.keras.callbacks.Callback):\n",
        "  # A callback for artificially interrupting training.\n",
        "  def __init__(self, interrupting_epoch=3):\n",
        "    self.interrupting_epoch = interrupting_epoch\n",
        "\n",
        "  def on_epoch_end(self, epoch, log=None):\n",
        "    if epoch == self.interrupting_epoch:\n",
        "      raise RuntimeError('Interruption')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "AhU3VTYZoDh-"
      },
      "source": [
        "Then, define and instantiate a simple Keras model, define the loss function, call `Model.compile`, and set up a `tf.keras.callbacks.BackupAndRestore` callback that will save the checkpoints in a temporary directory at epoch boundaries:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "1VOQLDNkl2bl"
      },
      "outputs": [],
      "source": [
        "def create_model():\n",
        "  return tf.keras.models.Sequential([\n",
        "    tf.keras.layers.Flatten(input_shape=(28, 28)),\n",
        "    tf.keras.layers.Dense(512, activation='relu'),\n",
        "    tf.keras.layers.Dropout(0.2),\n",
        "    tf.keras.layers.Dense(10)\n",
        "  ])\n",
        "loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)\n",
        "model = create_model()\n",
        "model.compile(optimizer='adam',\n",
        "              loss=loss,\n",
        "              metrics=['accuracy'])\n",
        "log_dir = tempfile.mkdtemp()\n",
        "backup_restore_callback = tf.keras.callbacks.BackupAndRestore(\n",
        "    backup_dir = log_dir)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LRRWmZqsvMrq"
      },
      "source": [
        "Start training the model with `Model.fit`. During training, checkpoints will be saved thanks to `tf.keras.callbacks.BackupAndRestore` instantiated above, while the `InterruptAtEpoch` class will raise an artificial exception to simulate a failure after the fourth epoch."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "8bVO79qWl4Uv"
      },
      "outputs": [],
      "source": [
        "try:\n",
        "  model.fit(x=x_train,\n",
        "            y=y_train,\n",
        "            epochs=10,\n",
        "            steps_per_epoch=100,\n",
        "            validation_data=(x_test, y_test),\n",
        "            callbacks=[backup_restore_callback, InterruptAtEpoch()])\n",
        "except Exception as e:\n",
        "  print(f'{type(e).__name__}:{e}')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EWidh234vcRf"
      },
      "source": [
        "Next, instantiate the Keras model, call `Model.compile`, and continue training the model with `Model.fit` from a previously saved checkpoint:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "3IWPH0Cmn2wi"
      },
      "outputs": [],
      "source": [
        "model = create_model()\n",
        "model.compile(optimizer='adam',\n",
        "              loss=loss,\n",
        "              metrics=['accuracy'],\n",
        "              steps_per_execution=10)\n",
        "model.fit(x=x_train,\n",
        "            y=y_train,\n",
        "            epochs=10,\n",
        "            steps_per_epoch=100,\n",
        "            validation_data=(x_test, y_test),\n",
        "            callbacks=[backup_restore_callback])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nP2dnpMPxtYj"
      },
      "source": [
        "Define another `Callback` class that artificially throws an error during the 140th step:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "YardkAaBxr-c"
      },
      "outputs": [],
      "source": [
        "class InterruptAtStep(tf.keras.callbacks.Callback):\n",
        "  # A callback for artificially interrupting training.\n",
        "  def __init__(self, interrupting_step=140):\n",
        "    self.total_step_count = 0\n",
        "    self.interrupting_step = interrupting_step\n",
        "\n",
        "  def on_batch_begin(self, batch, logs=None):\n",
        "    self.total_step_count += 1\n",
        "\n",
        "  def on_batch_end(self, batch, logs=None):\n",
        "    if self.total_step_count == self.interrupting_step:\n",
        "      print(\"\\nInterrupting at step count\", self.total_step_count)\n",
        "      raise RuntimeError('Interruption')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Af3VpehxyTpb"
      },
      "source": [
        "Note: This section uses features that are only available in `tf-nightly` until Tensorflow 2.10 is released.\n",
        "\n",
        "To make sure the checkpoints are saved every 30 steps, set the `save_freq` in the `BackupAndRestore` callback to `30`. The `InterruptAtStep` will raise an artificial exception to simulate a failure at epoch 1 and step 40 (total step count 140). The checkpoint would be last saved at epoch 1 and step 20."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "dHHCENDPyUHS"
      },
      "outputs": [],
      "source": [
        "log_dir_2 = tempfile.mkdtemp()\n",
        "\n",
        "backup_restore_callback = tf.keras.callbacks.BackupAndRestore(\n",
        "    backup_dir = log_dir_2, save_freq=30\n",
        ")\n",
        "model = create_model()\n",
        "model.compile(optimizer='adam',\n",
        "              loss=loss,\n",
        "              metrics=['accuracy'])\n",
        "try:\n",
        "  model.fit(x=x_train,\n",
        "            y=y_train,\n",
        "            epochs=10,\n",
        "            steps_per_epoch=100,\n",
        "            validation_data=(x_test, y_test),\n",
        "            callbacks=[backup_restore_callback, InterruptAtStep()])\n",
        "except Exception as e:\n",
        "  print(f'{type(e).__name__}:{e}')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2-ggMFEHynMR"
      },
      "source": [
        "Next, instantiate the Keras model, call `Model.compile`, and continue training the model with `Model.fit` from a previously saved checkpoint. Notice that the training starts from epoch 2 and step 21."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "vT7Kx30NEqly"
      },
      "outputs": [],
      "source": [
        "model = create_model()\n",
        "model.compile(optimizer='adam',\n",
        "              loss=loss,\n",
        "              metrics=['accuracy'],\n",
        "              steps_per_execution=10)\n",
        "model.fit(x=x_train,\n",
        "            y=y_train,\n",
        "            epochs=10,\n",
        "            steps_per_epoch=100,\n",
        "            validation_data=(x_test, y_test),\n",
        "            callbacks=[backup_restore_callback])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OdWexHUUaEB6"
      },
      "source": [
        "## TensorFlow 2: Write manual checkpoints with a custom training loop\n",
        "\n",
        "If you use a custom training loop in TensorFlow 2, you can implement a fault tolerance mechanism with the `tf.train.Checkpoint` and `tf.train.CheckpointManager` APIs.\n",
        "\n",
        "This example demonstrates how to:\n",
        "\n",
        "- Use a `tf.train.Checkpoint` object to manually create a checkpoint, where the trackable objects you want to save are set as attributes.\n",
        "- Use a `tf.train.CheckpointManager` to manage multiple checkpoints.\n",
        "\n",
        "Start by defining and instantiating the Keras model, the optimizer, and the loss function. Then, create a `Checkpoint` that manages two objects with trackable states (the model and the optimizer), as well as a `CheckpointManager` for logging and keeping several checkpoints in a temporary directory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "hPnIRKC8aDwE"
      },
      "outputs": [],
      "source": [
        "model = create_model()\n",
        "optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)\n",
        "loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)\n",
        "log_dir = tempfile.mkdtemp()\n",
        "epochs = 5\n",
        "steps_per_epoch = 5\n",
        "\n",
        "checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer)\n",
        "checkpoint_manager = tf.train.CheckpointManager(\n",
        "            checkpoint, log_dir, max_to_keep=2)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "L2tK4fm6xNkJ"
      },
      "source": [
        "Now, implement a custom training loop where after the first epoch every time a new epoch starts the last checkpoint is loaded:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "GhQphF5jxPWU"
      },
      "outputs": [],
      "source": [
        "for epoch in range(epochs):\n",
        "  if epoch > 0:\n",
        "      tf.train.load_checkpoint(save_path)\n",
        "  print(f\"\\nStart of epoch {epoch}\")\n",
        "\n",
        "  for step in range(steps_per_epoch):\n",
        "    with tf.GradientTape() as tape:\n",
        "\n",
        "      logits = model(x_train, training=True)\n",
        "      loss_value = loss_fn(y_train, logits)\n",
        "\n",
        "      grads = tape.gradient(loss_value, model.trainable_weights)\n",
        "      optimizer.apply_gradients(zip(grads, model.trainable_weights))\n",
        "\n",
        "    save_path = checkpoint_manager.save()\n",
        "    print(f\"Checkpoint saved to {save_path}\")\n",
        "    print(f\"Training loss at step {step}: {loss_value}\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "rQUS8nO9FZlH"
      },
      "source": [
        "## Next steps\n",
        "\n",
        "To learn more about fault tolerance and checkpointing in TensorFlow 2, consider the following documentation:\n",
        "\n",
        "- The `tf.keras.callbacks.BackupAndRestore` callback API docs.\n",
        "- The `tf.train.Checkpoint` and `tf.train.CheckpointManager` API docs.\n",
        "- The [Training checkpoints](../../guide/checkpoint.ipynb) guide, including the _Writing checkpoints_ section.\n",
        "\n",
        "You may also find the following material related to [distributed training](../..guide/distributed_training.ipynb) useful:\n",
        "\n",
        "- The _Fault tolerance_ section in the [Multi-worker training with Keras](../../tutorials/distribute/multi_worker_with_keras.ipynb) tutorial.\n",
        "- The _Handing task failure_ section in the [Parameter server training](../../tutorials/distribute/parameter_server_training.ipynb) tutorial."
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "fault_tolerance.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
